package com.atguigu.gmall.realtime.app.dwd;

import com.atguigu.gmall.realtime.util.MyKafkaUtil;
import com.atguigu.gmall.realtime.util.MySqlUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author Felix
 * @date 2022/11/22
 * 交易域加购事实表
 * 需要启动的进程
 *      zk、kafka、maxwell、DwdTradeCartAdd
 * 执行流程
 *      运行模拟生成业务数据的jar
 *      业务数据库表会发生变化
 *      binlog会将业务数据库表的变化记录下来
 *      maxwell会从binlog中读取变化数据，并将数据封装为一个json格式字符串发送给topic_db主题中
 *      DwdTradeCartAdd从topic_db主题中读取业务数据,并创建动态表
 *      过滤出加购行为-得到加购表
 *          table='cart_info'
 *          type='insert' or (type='update' and old[sku_num] is not null and data[sku_num]>old[sku_num])
 *      从MySQL数据库中读取字典表
 *      使用LookUpJoin将加购表和字典表进行连接
 *      将连接的结果写到kafka主题中
 */
public class DwdTradeCartAdd {
    public static void main(String[] args) {
        //TODO 1.基本环境准备
        //1.1 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);
        //1.3 指定表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        /*
        //TODO 2.检查点相关的设置
        //2.1 开启检查点
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        //2.2 设置检查点超时时间
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        //2.3 设置job取消后，检查点是否保留
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //2.4 设置两个检查点之间最小时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000L);
        //2.5 设置重启策略
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(30),Time.seconds(3)));
        //2.6 设置状态后端
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop202:8020/gmall/ck");
        //2.7 设置操作hadoop的用户
        System.setProperty("HADOOP_USER_NAME","atguigu");
        */
        //TODO 3.从kafka的topic_db主题中读取业务数据 并创建动态表
        tableEnv.executeSql(MyKafkaUtil.getTopicDbDDL("dwd_trade_cart_add_group"));
        // tableEnv.executeSql("select * from topic_db").print();

        //TODO 4.过滤出加购操作 得到加购表
        Table cartAdd = tableEnv.sqlQuery("select \n" +
            "    `data`['id'] id,\n" +
            "    `data`['user_id'] user_id,\n" +
            "    `data`['sku_id'] sku_id,\n" +
            "    `data`['source_type'] source_type,\n" +
            "    if(`type`='insert',`data`['sku_num'],cast(CAST(`data`['sku_num'] AS INT) - CAST(`old`['sku_num'] AS INT) as string)) sku_num,\n" +
            "    ts,\n" +
            "    proc_time\n" +
            "from topic_db where `table`='cart_info' and(`type`='insert' \n" +
            " or (`type`='update' and `old`['sku_num'] is not null and \n" +
            " CAST(`data`['sku_num'] AS INT) > CAST(`old`['sku_num'] AS INT)))");
        tableEnv.createTemporaryView("cart_add",cartAdd);
        // tableEnv.executeSql("select * from cart_add").print();

        //TODO 5.从mySQL数据库中读取字典表数据
        tableEnv.executeSql(MySqlUtil.getBaseDicLookUpDDL());
        // tableEnv.executeSql("select * from base_dic").print();

        //TODO 6.使用LookUpJoin将加购表和字典表进行关联
        Table joinedTable = tableEnv.sqlQuery("SELECT id,user_id,sku_id,source_type,dic_name as source_type_name,sku_num,ts\n" +
            "FROM cart_add AS c JOIN base_dic FOR SYSTEM_TIME AS OF c.proc_time AS d\n" +
            "    ON c.source_type = d.dic_code");
        tableEnv.createTemporaryView("joined_table",joinedTable);
        // tableEnv.executeSql("select * from joined_table").print();

        //TODO 7.创建动态表  和要写入的kafka主题进行映射
        tableEnv.executeSql("CREATE TABLE dwd_trade_cart_add (\n" +
            "  id string,\n" +
            "  user_id string,\n" +
            "  sku_id string,\n" +
            "  source_type string,\n" +
            "  source_type_name string,\n" +
            "  sku_num string,\n" +
            "  ts string,\n" +
            "  PRIMARY KEY (id) NOT ENFORCED\n" +
            ") " + MyKafkaUtil.getUpsertKafkaDDL("dwd_trade_cart_add"));

        //TODO 8.将关联的结果写到kafka主题中
        tableEnv.executeSql("insert into dwd_trade_cart_add select * from joined_table");
    }
}
